package com.atguigu.app

import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.JSON
import com.atguigu.bean.StartUpLog
import com.atguigu.constants.GmallConstants
import com.atguigu.handle.DauHandle
import com.atguigu.util.MyKafkaUtil
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.phoenix.spark._

object DauApp {
  def main(args: Array[String]): Unit = {
    //1.创建sparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("DauApp").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sparkConf,Seconds(3))

    //3.消费kafka数据
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_STARTUP,ssc)

    //4.将消费到的json字符串转为样例类，为了方便操作数据，并补全字段
    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH")

    val startUpLogDStream: DStream[StartUpLog] = kafkaDStream.mapPartitions(partition => {
      partition.map(record => {
        val startUpLog: StartUpLog = JSON.parseObject(record.value(), classOf[StartUpLog])
        //补全logdata和loghour这两个字段
        //将时间戳转为yyyy-MM-dd HH
        val times: String = sdf.format(new Date(startUpLog.ts))
        startUpLog.logDate = times.split(" ")(0)
        startUpLog.logHour = times.split(" ")(1)
        startUpLog
      })
    })

//    startUpLogDStream.cache()

    //打印原始数据条数
//    startUpLogDStream.count().print()

    //5.先做批次间去重
    val filterByRedisDStream: DStream[StartUpLog] = DauHandle.filterByRedis(startUpLogDStream,ssc.sparkContext)

//    filterByRedisDStream.cache()

    //打印经过批次间去重的数据条数
//    filterByRedisDStream.count().print()

    //6.做批次内去重
    val filterByGroupDStream: DStream[StartUpLog] = DauHandle.filterByGroup(filterByRedisDStream)
//    filterByGroupDStream.cache()
//    filterByGroupDStream.count().print()

    //7.将去重后的mid保存到redis
    DauHandle.saveToRedis(filterByGroupDStream)

    //8.将第6步去重后的明细数据写入Hbase
    filterByGroupDStream.print()
    filterByGroupDStream.foreachRDD(rdd=>{
      rdd.saveToPhoenix(
      "GMALL211126_DAU",
      Seq("MID", "UID", "APPID", "AREA", "OS", "CH", "TYPE", "VS", "LOGDATE", "LOGHOUR", "TS"),
      HBaseConfiguration.create,
      Some("hadoop102,hadoop103,hadoop104:2181")
      )
    })


//    //打印测试kafka数据
//    kafkaDStream.foreachRDD(rdd=>{
//      rdd.foreach(record=>{
//        println(record.value())
//      })
//    })

    ssc.start()
    ssc.awaitTermination()
  }
}
